Conversation
- Make executors immutable: with_options() always returns a new instance (copy.copy for base, new LocalExecutor() for local, RayExecutor already did this) - Remove execution_engine_opts from FunctionNode — pipeline executor assignment logic is now the sole owner of per-node configuration - Add type-safe executor dispatch via Generic[E] + __init_subclass__ on PacketFunctionBase — resolves executor protocol once at class definition time, validates at set_executor() instead of in the hot path - Add PythonFunctionExecutorProtocol with execute_callable/ async_execute_callable — executors receive raw callables + kwargs instead of packet_function + packet objects - PythonPacketFunction now routes call()/async_call() through execute_callable, keeping packet construction in the function - Add CachedFunctionPod — pod-level caching wrapper that intercepts process_packet() with tag+packet content hash as cache key - Add pod_cache_database parameter to function_pod decorator Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Generic[E] dispatch: wrapper doesn't resolve, protocol rejection for non-conforming executor, inactive function + executor returns None, async_call routes through async_execute_callable - LocalExecutor callable: async fn via execute_callable, sync/async fn via async_execute_callable - CachedFunctionPod: same tag+different packet cached separately, same tag+same packet is cache hit, inactive function doesn't store, output_schema delegation, dual caching (result_database + pod_cache_database) - Pipeline: no opts uses engine directly (no with_options call) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- CachedFunctionPod now aligns with CachedPacketFunction storage: stores function variation data, execution data, cache entry hash, and timestamp - Cache entry hash computed from tag + system tags + input packet hash (matching pipeline record entry_id pattern), ensuring two rows with identical user tags but different system tags are cached separately - FunctionNode.attach_databases() creates CachedFunctionPod wrapping the function pod instead of CachedPacketFunction wrapping the packet function - FunctionNode.process_packet() delegates to CachedFunctionPod for result caching and separately records pipeline provenance entries - CachedFunctionPod.async_process_packet() does sync DB caching + async computation via inner pod's async_process_packet - add_pipeline_record() now explicitly extracts source columns using select() instead of rename-then-drop pattern - iter_packets Phase 2 skip-check uses single cache entry hash - Added DESIGN_ISSUES.md note (CFP1) about potential optimization of reusing entry_hash between CachedFunctionPod and add_pipeline_record Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CachedFunctionPod now caches by input packet hash only — the function output depends solely on the packet, not the tag. Tag-level uniqueness (tag + system tags + input packet hash) is handled by FunctionNode's pipeline record (add_pipeline_record / compute_pipeline_entry_id). iter_packets Phase 2 skip check now uses pipeline entry_ids (which include tag + system tags + packet hash) retrieved from the pipeline database, ensuring correct deduplication when the same packet appears with different tags/system_tags. Also: - Extracted compute_pipeline_entry_id() as a reusable method - Updated DESIGN_ISSUES CFP1: shared ResultCache refactor suggestion - Added TODO notes for match tier support (aligned with P6) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New test file covering the dual-database caching architecture: - compute_pipeline_entry_id: determinism, tag/packet sensitivity - System tag awareness: identical user tags with different system tags produce different pipeline entry_ids - Result DB vs pipeline DB record counts: same packet/different tags → 1 result record, N pipeline records - Phase 1/2 with pipeline entry_ids: Phase 1 yields existing, Phase 2 skips matching entry_ids, processes only novel combinations - Same packet + new tag triggers Phase 2 (novel entry_id) even though CachedFunctionPod has a result cache hit - Pipeline records reference same result UUID for identical packets - Pipeline records include source columns but not data columns of input Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors the function execution and caching stack to make executors effectively immutable, move execution configuration ownership into the pipeline, and introduce pod-level result caching/provenance improvements (PLT-920).
Changes:
- Adds
PythonFunctionExecutorProtocol+Generic[E]-based executor dispatch and shiftsPythonPacketFunctionexecution to callable/kwargs routing. - Introduces
CachedFunctionPodand refactorsFunctionNodepersistence to cache by input packet hash while tracking pipeline provenance via(tag + system_tags + packet_hash)entry IDs. - Removes per-node
execution_engine_optsand applies pipeline-levelexecution_engine_optsviawith_options().
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline/test_pipeline.py | Updates pipeline DB/result path assertions; adds callable execution hooks and new pipeline engine opts tests. |
| tests/test_pipeline/test_node_descriptors.py | Updates serialized descriptor expectations to use CachedFunctionPod record paths. |
| tests/test_core/test_regression_fixes.py | Extends test executor to support execute_callable. |
| tests/test_core/packet_function/test_executor.py | Adds executor immutability tests; adds protocol conformance + generic dispatch tests; updates call-routing expectations. |
| tests/test_core/function_pod/test_function_pod_node.py | Updates result record path assertion to CachedFunctionPod. |
| tests/test_core/function_pod/test_function_node_caching.py | New integration tests for pipeline entry IDs, phase1/phase2 behavior, and record-count semantics. |
| tests/test_core/function_pod/test_function_node_attach_db.py | Updates attach-db behavior tests to assert CachedFunctionPod wrapping. |
| tests/test_core/function_pod/test_cached_function_pod.py | New unit tests for CachedFunctionPod caching semantics and decorator integration. |
| src/orcapod/types.py | Updates PipelineConfig docs for new execution_engine_opts semantics. |
| src/orcapod/protocols/core_protocols/executor.py | Tightens with_options() contract; introduces PythonFunctionExecutorProtocol. |
| src/orcapod/protocols/core_protocols/init.py | Exports PythonFunctionExecutorProtocol. |
| src/orcapod/pipeline/graph.py | Removes per-node engine opt merging; applies pipeline options via with_options(). |
| src/orcapod/core/packet_function.py | Adds Generic[E] resolution; adds set_executor() validation; routes Python execution via execute_callable. |
| src/orcapod/core/nodes/function_node.py | Replaces CachedPacketFunction persistence with CachedFunctionPod; adds entry-id-based two-phase iteration and pipeline record cleanup. |
| src/orcapod/core/function_pod.py | Adds decorator support for pod-level caching via pod_cache_database. |
| src/orcapod/core/executors/ray.py | Adds execute_callable / async_execute_callable implementations. |
| src/orcapod/core/executors/local.py | Adds callable execution implementations; updates with_options() to return new instance. |
| src/orcapod/core/executors/base.py | Makes with_options() return a new instance by default; adds default callable execution methods. |
| src/orcapod/core/cached_function_pod.py | New pod-level caching wrapper aligning stored columns with CachedPacketFunction. |
| DESIGN_ISSUES.md | Documents follow-up refactor opportunity to deduplicate cache logic. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…on and CachedFunctionPod New ResultCache class owns lookup, store, conflict resolution, and auto-flush logic. Both CachedPacketFunction and CachedFunctionPod delegate to a ResultCache instance. ResultCache.lookup accepts additional_constraints dict — the hook for future match tier support (P6). Default lookup matches on INPUT_PACKET_HASH_COL only; additional constraints can narrow the match (e.g. by function variation hash). Resolves DESIGN_ISSUES CFP1. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
- Lookup: miss on empty DB, miss on different packet, hit returns correct result with RESULT_COMPUTED_FLAG=False, different record paths are isolated - Conflict resolution: most recent timestamp wins - Additional constraints: non-matching constraint filters out, matching constraint (e.g. function_name) returns result - Store: input_packet_hash, variation, execution, timestamp, output data columns all present - Auto flush: default true, set_auto_flush, constructor param - get_all_records: empty returns None, includes/excludes system columns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- PythonFunctionExecutorProtocol now extends PacketFunctionExecutorProtocol (fixes type bound mismatch and ensures supports()/supported_function_type_ids are always available) - PacketFunctionBase._executor typed as E | None (type-safe access to execute_callable on PythonPacketFunction without casts) - LocalExecutor.execute_callable handles nested event loops (mirrors PythonPacketFunction._call_async_function_sync pattern) - Pipeline._apply_execution_engine always calls with_options() per node — executor decides what to copy vs share - Fixed stale docstring (pod_cache_database: "input packet content hash" not "tag+packet hash") - Fixed type annotations: list[str] | None in test helpers, list[Any] for mock executor call lists - Updated pipeline tests to check node's executor (not original mock) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements the function execution chain improvements outlined in PLT-920. Cleans up the logic around function node, function pod, packet function, and executor — making the executor receive raw callables rather than knowing about packet function internals, and separating result caching from pipeline provenance tracking.
Executor immutability & cleanup
with_options()always returns a new instance (executors are immutable value objects)with_options()per node — executor decides what to copy vs shareexecution_engine_optsfromFunctionNode— pipeline executor-assignment logic is the sole ownerType-safe executor dispatch (
Generic[E]+__init_subclass__)PacketFunctionBaseis nowGeneric[E]with_executor: E | None— resolves executor protocol once at class definition timePythonFunctionExecutorProtocolextendsPacketFunctionExecutorProtocol, addingexecute_callable(fn, kwargs, executor_options)/async_execute_callablePythonPacketFunctionroutescall()/async_call()throughexecute_callable, keeping packet construction in the functionPacketFunctionWrapperstays generic (PacketFunctionBase[E]), preserving executor type constraintsShared
ResultCacheResultCacheclass extracts shared lookup/store/conflict-resolution logicResultCache.lookupacceptsadditional_constraintsdict — extensibility hook for future match tier support (DESIGN_ISSUES P6)CachedPacketFunctionandCachedFunctionPoddelegate to aResultCacheinstance — no duplicated caching logicCachedFunctionPod+FunctionNoderefactorCachedFunctionPod— pod-level caching wrapper that interceptsprocess_packet(), caches by input packet hash only (function output depends solely on packet, not tag)FunctionNodeusesCachedFunctionPodfor result caching, with separate pipeline provenance records viaadd_pipeline_recordhash(tag + system_tags + input_packet_hash)— ensures different source entries with same user tags are tracked separatelyiter_packetsPhase 2 skip check uses pipeline entry_ids (not just packet hash)add_pipeline_recordexplicitly saves source columns of input packets (no data columns)compute_pipeline_entry_id()extracted as reusable methodCachedFunctionPod.async_process_packet()does sync DB caching + async computationLocalExecutor.execute_callablehandles nested event loops safelyTest plan
ResultCache: lookup miss/hit, conflict resolution,additional_constraintsfiltering, store columns, auto-flush,get_all_recordswith_optionsreturns new instance, preserves statePythonFunctionExecutorProtocolconformance,execute_callablewith sync/async functionsGeneric[E]dispatch: protocol resolution, rejection of non-conforming executors, inactive function + executorCachedFunctionPod: cache miss/hit, same packet different tags = cache hit (packet-only key), inactive function, dual caching, output_schema delegationwith_options)Closes PLT-920